﻿using Confluent.Kafka;
using Microsoft.Extensions.Options;
using Naruto.Subscribe.Provider.Kafka.Consts;
using Naruto.Subscribe.Provider.Kafka.Extension;
using Naruto.Subscribe.Provider.Kafka.Interface;
using Naruto.Subscribe.Provider.Kafka.Options;
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Text;
using System.Threading;

namespace Naruto.Subscribe.Provider.Kafka.Internal
{
    /// <summary>
    /// 生产者连接池的默认实现
    /// </summary>
    public class ProducerConnectionPool : IProducerConnectionPool
    {
        /// <summary>
        /// 生产者连接池大小 池内的最大长度
        /// </summary>
        private int _producerConnectionPoolSize;

        /// <summary>
        /// 当前池的长度
        /// </summary>
        private int _currentSize;
        /// <summary>
        ///生产者连接池
        /// </summary>
        private readonly ConcurrentQueue<IProducer<string, byte[]>> _producerConnectionPool;

        /// <summary>
        /// 配置信息
        /// </summary>
        private readonly IOptions<KafkaOption> _options;

        public ProducerConnectionPool(IOptions<KafkaOption> options)
        {
            _options = options;
            _producerConnectionPoolSize = options.Value.ProducerConnectionPoolSize <= 0 ? KafkaConst.DefaultProducerConnectionPoolSize : options.Value.ProducerConnectionPoolSize;
            _currentSize = 0;
            _producerConnectionPool = new ConcurrentQueue<IProducer<string, byte[]>>();
        }
        /// <summary>
        /// 
        /// </summary>
        public void Dispose()
        {
            Disopse(true);
        }
        /// <summary>
        /// 资源释放
        /// </summary>
        /// <param name="isDispose"></param>
        protected virtual void Disopse(bool isDispose)
        {
            if (isDispose)
            {
                _currentSize = 0;
                while (_producerConnectionPool.TryDequeue(out var connection))
                {
                    connection.Dispose();
                }
            }
        }
        /// <summary>
        /// 借
        /// </summary>
        /// <returns></returns>
        public IProducer<string, byte[]> Rent()
        {
            //出队
            if (_producerConnectionPool.TryDequeue(out var producer))
            {
                Interlocked.Decrement(ref _currentSize);
                return producer;
            }
            //实例化生产者 的配置
            var producerConfig = new ProducerConfig(new Dictionary<string, string>())
            {
                BootstrapServers = _options.Value.BrokersServices.ToBrokersService(),
                QueueBufferingMaxMessages = 10,
                MessageTimeoutMs = 5000,
                RequestTimeoutMs = 3000
            };
            //实例化生产者
            producer = new ProducerBuilder<string, byte[]>(producerConfig).Build();
            return producer;
        }
        /// <summary>
        /// 还
        /// </summary>
        /// <param name="producer"></param>
        /// <returns></returns>
        public bool Return(IProducer<string, byte[]> producer)
        {
            //验证当前池内是否已经满了
            if (Interlocked.Increment(ref _currentSize) > _producerConnectionPoolSize)
            {
                producer?.Dispose();
                Interlocked.Decrement(ref _currentSize);
                return false;
            }
            //入队
            _producerConnectionPool.Enqueue(producer);
            return true;
        }
    }
}
